package org.budo.warehouse.logic.consumer.buffer;

import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

import javax.annotation.Resource;

import org.budo.graph.annotation.SpringGraph;
import org.budo.support.dao.page.Page;
import org.budo.support.servlet.util.QueryStringUtil;
import org.budo.time.Time;
import org.budo.warehouse.logic.api.AbstractDataConsumerWrapper;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.util.DataMessageLogicUtil;
import org.budo.warehouse.logic.util.EntryBufferLogicUtil;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.api.IEntryBufferService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.EntryBuffer;
import org.budo.warehouse.service.entity.Pipeline;
import org.budo.warehouse.service.util.EntryBufferServiceUtil;
import org.springframework.aop.framework.AopContext;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
@Getter
@Setter
@ToString
public class BufferedDataConsumerWrapper extends AbstractDataConsumerWrapper {
    private static final String BUFFER_SIZE_KEY = "bufferSize";

    private static final String MAX_DELAY_KEY = "maxDelay";

    /**
     * 已经添加的定时任务将执行的时间,每个pipline会有一个当前类的实例
     */
    private Date timerTaskScheduledAt;

    @Resource
    private IDataNodeService dataNodeService;

    @Resource
    private IEntryBufferService entryBufferService;

    @Override
    public void consume(DataMessage dataMessage) {
        Pipeline pipeline = this.getPipeline();
        Integer targetDataNodeId = pipeline.getTargetDataNodeId();
        DataNode targetDataNode = dataNodeService.findByIdCached(targetDataNodeId);

        Integer maxDelay = QueryStringUtil.getParameterInteger(targetDataNode.getUrl(), MAX_DELAY_KEY);
        Integer bufferSize = QueryStringUtil.getParameterInteger(targetDataNode.getUrl(), BUFFER_SIZE_KEY);
        if (null == maxDelay || maxDelay <= 0 || null == bufferSize || bufferSize <= 0) {
            DataConsumer dataConsumer = this.getDataConsumer();
            dataConsumer.consume(dataMessage);
            return;
        }

        BufferedDataConsumerWrapper _this = (BufferedDataConsumerWrapper) AopContext.currentProxy();
        _this.addToBuffer(dataMessage);
    }

    @SpringGraph
    public void addToBuffer(DataMessage dataMessage) {
        DataMessage dataMessageBuffer = DataMessageLogicUtil.toMessageBuffer(dataMessage);

        List<DataEntry> dataEntries = dataMessageBuffer.getDataEntries();
        if (dataEntries.isEmpty()) {
            log.debug("#44 dataEntryPojos={}, dataMessage={}", dataEntries, dataMessage);
            return;
        }

        Pipeline pipeline = this.getPipeline();
        List<EntryBuffer> entryBuffers = EntryBufferLogicUtil.messageToBuffers(dataMessageBuffer, pipeline);

        entryBufferService.insertBatch(entryBuffers);

        this.checkFlushBuffer();
    }

    private void checkFlushBuffer() {
        Pipeline pipeline = this.getPipeline();
        Integer pipelineId = pipeline.getId();
        Integer targetDataNodeId = pipeline.getTargetDataNodeId();
        DataNode targetDataNode = dataNodeService.findByIdCached(targetDataNodeId);

        Integer count = entryBufferService.countNotFlushedByPipelineId(pipelineId);
        if (null == count || count < 1) {
            log.info("#100 count=" + count + ", pipelineId=" + pipelineId + ", targetDataNode=" + targetDataNode);
            return;
        }

        BufferedDataConsumerWrapper _this = (BufferedDataConsumerWrapper) AopContext.currentProxy();

        // 如果超过bufferSize就全发出去
        Integer bufferSize = QueryStringUtil.getParameterInteger(targetDataNode.getUrl(), BUFFER_SIZE_KEY);
        if (null != bufferSize && count >= bufferSize) {
            log.info("#72 bufferSize=" + bufferSize + ", count=" + count + ", pipeline=" + pipeline + ", targetDataNode=" + targetDataNode);
            _this.doFlushBuffer(); // 发邮件
            return;
        }

        // 如果超过延迟时间就全发出去
        Integer maxDelay = QueryStringUtil.getParameterInteger(targetDataNode.getUrl(), MAX_DELAY_KEY, 0);
        Timestamp maxCreatedAt = entryBufferService.findNotFlushedMaxCreatedAtByPipelineId(pipelineId);
        if (Time.now().isAfter( //
                Time.when(maxCreatedAt).plusSecond(maxDelay) //
        )) {
            log.info("#81 maxDelay=" + maxDelay + ", maxCreatedAt=" + maxCreatedAt + ", pipeline=" + pipeline + ", targetDataNode=" + targetDataNode);
            _this.doFlushBuffer(); // 发邮件
            return;
        }

        // 已经添加任务,休息延迟时间后再回来
        if (null != this.timerTaskScheduledAt) {
            log.info("#144 timerTaskScheduledAt=" + this.timerTaskScheduledAt + ", this=" + this);
            return;
        }

        // 添加任务时间
        this.timerTaskScheduledAt = Time.now().plusSecond(maxDelay).toDate();

        // 添加任务
        Timer timer = new Timer(true);
        timer.schedule(new TimerTask() {
            public void run() {
                BufferedDataConsumerWrapper.this.checkFlushBuffer();

                BufferedDataConsumerWrapper.this.timerTaskScheduledAt = null; // 任务执行过后允许再加任务
            }
        }, this.timerTaskScheduledAt); // 指定时间执行
    }

    /**
     * 处理消息然后清空缓冲区
     */
    @SpringGraph
    public void doFlushBuffer() {
        Pipeline pipeline = this.getPipeline();

        // 缓冲区消息
        List<EntryBuffer> entryBuffers = entryBufferService.listNotFlushedByPipelineId(pipeline.getId(), Page.max());
        List<Integer> entryBufferIds = EntryBufferServiceUtil.getIds(entryBuffers);

        // 处理缓冲区消息
        DataMessage dataMessage = EntryBufferLogicUtil.buffersToMessage(entryBuffers, pipeline.getTargetDataNodeId());
        this.getDataConsumer().consume(dataMessage);

        // 标记为已处理,清空缓冲区
        entryBufferService.updateFlushedAtByIds(Time.now().toTimestamp(), entryBufferIds);
    }
}
